package com.gin.monitor.agg.connector.kafka;

import com.gin.monitor.agg.connector.kafka.util.TrafficInfoDeserializationSchema;
import com.gin.monitor.agg.vo.TrafficInfo;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;

import java.util.Properties;

/**
 * 启动 Kafka
 * bin/kafka-server-start.sh -daemon config/server.properties
 * 连接broker-list, 发送消息至kafka名为config的topic
 * cd /opt/software/kafka_2.11-2.0.0/bin
 * ./kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic config
 * >001 北京
 * 消费测试
 * cd /opt/software/kafka_2.11-2.0.0/bin
 * ./kafka-console-consumer.sh --bootstrap-server node01:9092,node02:9092,node03:9092 --topic config --group group02 --property print.key=true --property print.value=true --property key.separator=,
 *
 * @author gin
 * @date 2021/2/20
 */
public class FlinkKafkaUtils {

    public static final String FLINK_KAFKA_TEST_TOPIC = "t_traffic_monitor";
    public static final String KAFKA_GROUP_ID_KEY = "group.id";
    public static final String KAFKA_GROUP_ID_VAL = "traffic-group";
    public static final String BOOTSTRAP_SERVERS_CONFIG_VAL = "node01:9092";

    public static Properties getKafkaPropsWithDeSerializer() {
        //设置连接kafka的配置信息
        Properties props = new Properties();
        //注意: kafka0.10之前版本: zookeeper url, 之后版本: bootstrap.servers
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                BOOTSTRAP_SERVERS_CONFIG_VAL);
        props.setProperty(KAFKA_GROUP_ID_KEY, KAFKA_GROUP_ID_VAL);
        return props;
    }

    public static FlinkKafkaConsumer<TrafficInfo> getKafkaConsumer() {
        return new FlinkKafkaConsumer<>(FLINK_KAFKA_TEST_TOPIC,
            new TrafficInfoDeserializationSchema(), getKafkaPropsWithDeSerializer());
    }

    public static FlinkKafkaConsumer<TrafficInfo> getKafkaConsumer(String topic) {
        return new FlinkKafkaConsumer<>(topic,
            new TrafficInfoDeserializationSchema(), getKafkaPropsWithDeSerializer());
    }

}
